Flink: Add RowDataTaskWriter to accept equality deletions.#1818
Flink: Add RowDataTaskWriter to accept equality deletions.#1818openinx wants to merge 12 commits intoapache:masterfrom openinx:flink-cdc-writers
Conversation
| public class SortedPosDeleteWriter<T> implements Closeable { | ||
| private static final int RECORDS_NUM_THRESHOLD = 1000_000; | ||
|
|
||
| private final Map<CharSequence, List<Long>> posDeletes = Maps.newHashMap(); |
There was a problem hiding this comment.
Maybe we could just put the pair <path, pos> into a fixed array, that seems more memory efficient ?
There was a problem hiding this comment.
That would be memory efficient if all of the paths that are equal are also the same reference. I like how using a map doesn't have that assumption, but it probably isn't a big concern.
| this.dataWriter = new RollingFileWriter(partition); | ||
|
|
||
| this.enableEqDelete = equalityFieldIds != null && !equalityFieldIds.isEmpty(); | ||
| if (enableEqDelete) { |
There was a problem hiding this comment.
Why use a delta writer if eq deletes are disabled?
I typically like to use classes that don't need to check configuration in a tight loop. This setting introduces at least one check per row. I'd prefer using either a normal task writer or a delta writer depending on whether deletes are expected in the stream.
There was a problem hiding this comment.
Why use a delta writer if eq deletes are disabled?
Because I only want to expose the BaseDeltaWriter to compute engines, I planed to make the BaseRollingWriter & RollingFileWriter & RollingEqDeleteWriter to be private. To implement the compute-engine specific TaskWriter, the only thing we need to do is implementing the asKey and asCopiedKey methods and customizing the policy to dispatch records to DeltaWriter.
| // Adding a pos-delete to replace the old filePos. | ||
| FilePos previous = insertedRowMap.put(copiedKey, filePos); | ||
| if (previous != null) { | ||
| posDeleteWriter.delete(previous.path, previous.rowOffset, null /* TODO set non-nullable row*/); |
There was a problem hiding this comment.
How would this set the row? Would we need to keep track of it somehow?
There was a problem hiding this comment.
The straightforward way is adding a row field in FilePos which will reference to the inserted old row, but that will hold references of all the inserted rows in a checkpoint. If the row is large while equality fields are small, then the idea way should only keep the equality fields & file-pos in the insertedRowMap, but if we want to attach row when writing pos-delete file then the memory consumption is an issue. I'm considering that maybe we will need an embedded KV lib which could split to disk in future.
| @Override | ||
| public void close() throws IOException { | ||
| // Moving the completed data files into task writer's completedFiles automatically. | ||
| dataWriter.close(); |
There was a problem hiding this comment.
Minor: dataWriter should be set to null so that it can be garbage collected and so any further calls to write will fail.
| private SortedPosDeleteWriter<T> posDeleteWriter = null; | ||
| private StructLikeMap<FilePos> insertedRowMap = null; | ||
|
|
||
| public BaseDeltaWriter(PartitionKey partition, List<Integer> equalityFieldIds, Schema schema) { |
There was a problem hiding this comment.
The list equalityFieldIds is only used in this constructor and it is used to create a projection of the schema that is passed in. I think it would be better to pass the delete schema or null in, so that we don't need each writer to create a new projection of the row schema.
| MetadataColumns.DELETE_FILE_POS); | ||
| } | ||
|
|
||
| public static Schema pathPosSchema(Schema rowSchema) { |
There was a problem hiding this comment.
This one doesn't need to be public.
| } | ||
| } | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException(e); |
There was a problem hiding this comment.
I always like to include whatever context is available. Here, it may be helpful to know which partition writer failed. What about using throw new UncheckedIOException("Failed to write position delete file for partition " + partitionKey, e)
| } | ||
|
|
||
| public List<DeleteFile> complete() { | ||
| flush(); |
There was a problem hiding this comment.
I would expect this to call close rather than flush. While close just calls flush so they are equivalent right now, I think using close is better in the long term. If close is modified in the future, it is unlikely that someone will go here and make the same change.
There was a problem hiding this comment.
Yeah, make sense ! It's better to use close in this complete method.
| } | ||
| for (DeleteFile deleteFile : result.deleteFiles()) { | ||
| add(deleteFile); | ||
| } |
There was a problem hiding this comment.
Rather than copying the loops, could this call addDataFiles and addDeleteFiles?
There was a problem hiding this comment.
The dataFiles() will return an array (I defined it as an array because wanting to avoid the serialization issue for ImmutableList), while the current addDataFiles would only accept Iterable<DataFile>. I think I can add a addDataFiles(DataFile... dataFiles).
|
|
||
| @Override | ||
| public Set<StructLike> keySet() { | ||
| return wrapperMap.keySet().stream().map(StructLikeWrapper::get).collect(Collectors.toSet()); |
There was a problem hiding this comment.
I think this should use a StructLikeSet, or else there is no expectation that the returned set will function properly.
There was a problem hiding this comment.
Make sense. I will provide a full unit test to cover all those map API.
|
|
||
| @Override | ||
| public Set<Entry<StructLike, T>> entrySet() { | ||
| throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
I think this should be implemented.
It isn't too difficult to implement Map.Entry with a getKey method that calls StructLikeWrapper::get. This method is commonly called on maps. It's even called from putAll above in this class.
|
|
||
| @Override | ||
| public boolean containsValue(Object value) { | ||
| throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Can't this just delegate?
@Override
public boolean containsValue(Object value) {
return wrapperMap.containsValue(value);
}| public class SortedPosDeleteWriter<T> implements Closeable { | ||
| private static final int RECORDS_NUM_THRESHOLD = 1000_000; | ||
|
|
||
| private final Map<CharSequence, List<Long>> posDeletes = Maps.newHashMap(); |
There was a problem hiding this comment.
This needs to use a new CharSequenceMap or Map<CharSequenceWrapper, List<Long>>.
| @Override | ||
| public EqualityDeleteWriter<Record> newEqDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, | ||
| StructLike partition) { | ||
| throw new UnsupportedOperationException("Cannot create equality-delete writer for generic record now."); |
There was a problem hiding this comment.
I expected to provide the implemented methods with complete coverage unit tests. But for the purpose providing the PoC solution as soon as possible, I did't have the time to write those tests so left them unsupported now.
| @Parameterized.Parameters(name = "format = {0}") | ||
| public static Object[] parameters() { | ||
| return new Object[] { "parquet", "avro" }; | ||
| return new Object[] {"parquet", "avro"}; |
There was a problem hiding this comment.
Nit: this line doesn't need to change. Can you revert it to avoid commit conflicts?
| .buildPositionWriter(); | ||
|
|
||
| case PARQUET: | ||
| RowType flinkParquetRowType = FlinkSchemaUtil.convert(DeleteUtil.posDeleteSchema(rowSchema)); |
There was a problem hiding this comment.
I think we should separately fix the schema that gets passed to the createWriterFunc. That's a bug.
| RowType flinkParquetRowType = FlinkSchemaUtil.convert(DeleteUtil.posDeleteSchema(rowSchema)); | ||
|
|
||
| return Parquet.writeDeletes(outputFile.encryptingOutputFile()) | ||
| .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkParquetRowType, msgType)) |
There was a problem hiding this comment.
I don't think this will work because the writer that is returned will be wrapped by a PositionDeleteStructWriter. That would duplicate the position delete struct because this is going to produce a writer for it as well. That's why the schema passed here is a bug, like I noted above.
There was a problem hiding this comment.
Yeah, this needs to be fixed. I'm looking through how Avro actually works right now and it is okay because we're calling setSchema a second time from the position writer, which basically discards the original writer with the extra record and rebuilds it with just the row schema.
I'll open a PR to fix it.
| public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { | ||
| // close all open files and emit files to downstream committer operator | ||
| for (DataFile dataFile : writer.complete()) { | ||
| for (DataFile dataFile : writer.complete().dataFiles()) { |
There was a problem hiding this comment.
Should this also check that there are no delete files?
There was a problem hiding this comment.
Yes, it should be emit the deleteFiles to downstream, and commit both delete files and data files into iceberg table by RowDelta API. I think that would be a separate PR to address this.
| } | ||
|
|
||
| public static class PositionDeleteStructWriter<R> extends StructWriter<PositionDelete<R>> { | ||
|
|
There was a problem hiding this comment.
Nit: this file doesn't need to change.
| } | ||
|
|
||
| @Override | ||
| public EqualityDeleteWriter<InternalRow> newEqDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, |
There was a problem hiding this comment.
I think one of the first commits to get this in could make these small changes to both Spark and Flink to update the appender factory. We could do the same for generics as well.
|
|
||
| @Override | ||
| protected StructLike asKey(RowData row) { | ||
| return rowDataWrapper.wrap(row); |
There was a problem hiding this comment.
I think this is a bug. It doesn't extract the key fields, it just wraps the row as a StructLike. It should extract the equality fields to produce a key, probably using StructProjection.
There was a problem hiding this comment.
I think the tests work because the ID column is the first column in the record.
There was a problem hiding this comment.
The tests work because the StructLikeMap has a StructLikeWrapper which will only compare the equality fields, even here we provided the full columns. The name asKey and asCopiedKey are not inappropriate here, asStructLike and asCopiedStructLike will be better.
There was a problem hiding this comment.
The tests work because the StructLikeMap has a StructLikeWrapper which will only compare the equality fields
But this happens using the key schema and fields are accessed by position. Wouldn't that fail if the key schema wasn't a prefix of the row schema?
| public void write(RowData row) throws IOException { | ||
| RowDataDeltaWriter deltaWriter; | ||
|
|
||
| if (spec().fields().size() <= 0) { |
There was a problem hiding this comment.
You can use spec.isUnpartitioned().
| case DELETE: | ||
| case UPDATE_BEFORE: | ||
| deltaWriter.delete(row); | ||
| break; |
There was a problem hiding this comment.
I'm not familiar with UPDATE_AFTER or UPDATE_BEFORE. Can you help me understand what's going on here?
There was a problem hiding this comment.
For CDC events, such as mysql binlog: UPDATE test set a=2, b=2 where a=1, b=1, the flink-cdc-connectors will produce two change logs for it:
-U (a=1, b=1)
+U (a=2,b=2)
The first RowData means we will need to delete the old row (1,1), it's also called a UPDATE_BEFORE row. The second RowData means we will need to insert the new row (2,2), it's also called a UPDATE_AFTER row.
There was a problem hiding this comment.
What distinguishes UPDATE_BEFORE from DELETE? Is it that it will be associated with UPDATE_AFTER so the two are combined to form a replace operation?
There was a problem hiding this comment.
Is it that it will be associated with UPDATE_AFTER so the two are combined to form a replace operation?
Yes, that's right.
| Tasks.foreach(deltaWriterMap.values()) | ||
| .throwFailureWhenFinished() | ||
| .noRetry() | ||
| .run(deltaWriter -> { |
There was a problem hiding this comment.
You can supply a checked exception class that will be thrown. That way, you can move the try/catch outside of the run block:
@Override
public void close() {
try {
Tasks.foreach(deltaWriterMap.values())
.throwFailureWhenFinished()
.noRetry()
.run(RowDataDeltaWriter::close, IOException.class);
} catch (IOException e) {
throw new UncheckedIOException("Failed to close writers", e);
}
}And you can also add a callback that receives the exception for each failure with onFailure if you want to log the exceptions individually.
|
|
||
| @Test | ||
| public void testWriteEqualityDelete() throws IOException { | ||
| if (format == FileFormat.ORC) { |
There was a problem hiding this comment.
Can you use Assume.assumeTrue? That way it doesn't look like ORC is passing. It shows up as skipped.
|
@openinx, this looks great to me! I found a few issues to address, but I think the general design and structure is good. Should we start breaking it into smaller commits to get the changes into master? |
|
All this work has been merged in several individual PR, I plan to close this PR now. |
@rdblue I've continued the work from here #1802 , and implemented a flink's
RowDataTaskWriterto accept both insert rows and equality deletions.